import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class TableAggregateFunctions {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);



        DataStream<Order2> ds1 = env.fromCollection(Arrays.asList(
                new Order2(1, "Latte", 6),
                new Order2(2, "Milk", 3),
                new Order2(3, "Breve", 5),
                new Order2(4,"Mocha",8),
                new Order2(5,"Tea",4)
        ));


        tEnv.createTemporaryView("MyTable",ds1);

// call function "inline" without registration in Table API
        Table result1=tEnv
                .from("MyTable")
                .groupBy($("price"))
                .flatAggregate(call(Top2.class, $("price")))
                .select($("price"), $("f0"), $("f1"));
        //这里是求最大的两个值，所以这个地方会有f0和f1

// call function "inline" without registration in Table API
// but use an alias for a better naming of Tuple2's fields
        Table result2=tEnv.from("MyTable")
                .groupBy($("price"))
                .flatAggregate(call(Top2.class, $("price")).as("value", "rank"))
                .select($("price"), $("value"), $("rank"));

// register function
        tEnv.createTemporarySystemFunction("Top2", Top2.class);

// call registered function in Table API
        Table result3=tEnv.from("MyTable")
                .groupBy($("price"))
                .flatAggregate(call("Top2", $("price")).as("value", "rank"))
                .select($("price"), $("value"), $("rank"));

        tEnv.toRetractStream(result1, Row.class).print();
        tEnv.toRetractStream(result2, Row.class).print();
        tEnv.toRetractStream(result3, Row.class).print();
        env.execute();
    }
}